package es

import (
	"code.simon.com/data-agent/common"
	"code.simon.com/data-agent/conf"
	"context"
	"github.com/olivere/elastic/v7"
	"github.com/sirupsen/logrus"
	"strings"
	"sync"
)

var (
	taskManager taskManage
	once        sync.Once
)

type task struct {
	client *elastic.Client
	ctx    context.Context
	Cancel context.CancelFunc
}

type taskManage struct {
	msrTask map[string]*task
}

// GetTaskManager  得到es的任务管理者-单例模式
func GetTaskManager() *taskManage {
	once.Do(func() {
		taskManager = taskManage{
			make(map[string]*task),
		}
	})
	return &taskManager
}
func (taskManage *taskManage) InitSource() {
	common.HandleNewError("elastic dont implement the source")
}

func (taskManage *taskManage) InitTarget() {
	// 初始化一个es的task 并注册到msrTask
	tt := &task{}
	taskManage.msrTask["source"] = tt
	var err error
	tt.client, err = elastic.NewClient(elastic.SetURL(conf.GetConfigObj().EsConfig.TargetIp))
	common.HandleError(err)
	info, code, err := tt.client.Ping(conf.GetConfigObj().EsConfig.TargetIp).Do(context.Background())
	if err != nil {
		panic(err)
	}
	logrus.Printf("Elasticsearch returned with code %d and version %s\n", code, info.Version.Number)
	go SendMsg(tt.client)
}

func SendMsg(client *elastic.Client) {

	for {
		select {
		case msg := <-common.MidChannel:
			data := string(*msg.Value)
			dataInfo := strings.Split(data, " ")
			json := "{"
			format := strings.Split(conf.ConfigObj.EsConfig.TargetFormat, "-")
			formatLen := len(format)
			for index, item := range format {
				if index != formatLen-1 {
					json += `"` + item + `":"` + dataInfo[index] + `"`
					json += ", "
				} else {
					i := dataInfo[index:]
					endMsg := ""
					for _, m := range i {
						endMsg += m
					}
					endMsg = strings.ReplaceAll(endMsg, "\"", "'")
					endMsg = strings.ReplaceAll(endMsg, "\r", "")
					json += `"` + item + `":"` + endMsg + `"`
					json += "}"
				}
			}
			logrus.Printf(json)
			put, err := client.Index().
				Index(conf.ConfigObj.EsConfig.TargetIndex).
				Type(conf.ConfigObj.EsConfig.TargetType).
				BodyJson(json).
				Do(context.Background())
			if err != nil {
				panic(err)
			}
			logrus.Printf("Indexed tweet %s to index s%s, type %s\n", put.Id, put.Index, put.Type)

		}
	}

}
